---
seotitle: Developing Streaming APIs
seodesc: Learn how to create services that stream data.
title: Streaming APIs
subtitle: How to create APIs that stream data
lang: ts
---

Encore makes it easy to create API endpoints that can stream data to and from your applications.

<GitHubLink
    href="https://github.com/encoredev/examples/tree/main/ts/streaming-chat"
    desc="Simple chat app using the Streaming API create a WebSocket stream from a web frontend."
/>

## Different kinds of stream

Encore supports three types of streams, each designed for a specific data flow direction:
- [**StreamIn**](#streamin): When you need to stream data into your service.
- [**StreamOut**](#streamout): When you need to stream data out from your service.
- [**StreamInOut**](#streaminout): When you need to stream data into and out of your service.

## How it works

When you connect to a streaming API endpoint, the client and server will do a handshake in the form of a HTTP request. If the server accepts the handshake request, a stream is returned to the client and to the API handler. Under the hood the stream is a WebSocket that can be used to send and receive messages over.

Path parameters, query parameters and headers can be passed via the handshake request. The stream returned to the client and to the API handler are typed with the incoming and outgoing message types that you specify in your API.

## Defining streaming APIs

Similar to how you can define [RESTful API endpoints](/docs/ts/primitives/defining-apis) with Encore, you can also easily define type-safe streaming API endpoints. They accept a handshake type, an incoming and an outgoing message type (depending on your choice of stream direction). The type parameters are required for Encore to understand your API.

If you don't need any data from the handshake, you can ignore that type, and only specify the incoming and outgoing message types.

<GitHubLink
    href="https://github.com/encoredev/examples/tree/main/ts/streaming"
    desc="Example showcases the all different streaming APIs: api.streamIn, api.streamOut, and api.streamInOut"
/>

### StreamIn

Use `api.streamIn` when you want to have a stream from client to server, for example if you are uploading something from the client to the server:

```typescript
import { api } from "encore.dev/api";
import log from "encore.dev/log";

// Used to pass initial data, optional.
interface Handshake {
  user: string;
}

// What the clients sends over the stream.
interface Message {
  data: string;
  done: boolean;
}

// Returned when the stream is done, optional.
interface Response {
  success: boolean;
}

export const uploadStream = api.streamIn<Handshake, Message, Response>(
  { path: "/upload", expose: true },
  async (handshake, stream) => {
    const chunks: string[] = [];
    try {
      // The stream object is an AsyncIterator that yields incoming messages.
      for await (const data of stream) {
        chunks.push(data.data);
        // Stop the stream if the client sends a "done" message
        if (data.done) break;
      }
    } catch (err) {
      log.error(`Upload error by ${handshake.user}:`, err);
      return { success: false };
    }
    log.info(`Upload complete by ${handshake.user}`);
    return { success: true };
  },
);
```

For `api.streamIn` you need to specify the incoming message type, the handshake type is optional. You can also specify a optional outgoing type if your API handler responds with some data when it is done with the incoming stream.

```ts
api.streamIn<Handshake, Incoming, Outgoing>(
  {...}, async (handshake, stream): Promise<Outgoing> => {...})
```

```ts
api.streamIn<Handshake, Incoming>(
  {...}, async (handshake, stream) => {...})
```

```ts
api.streamIn<Incoming, Outgoing>(
  {...}, async (stream): Promise<Outgoing> => {...})
```

```ts
api.streamIn<Incoming>(
  {...}, async (stream) => {...})
```

### StreamOut

Use `api.streamOut` if you want to have a stream of messages from the server to client, for example if you are streaming logs from the server:

```typescript
import { api, StreamOut } from "encore.dev/api";
import log from "encore.dev/log";

// Used to pass initial data, optional.
interface Handshake {
  rows: number;
}

// What the server sends over the stream.
interface Message {
  row: string;
}

export const logStream = api.streamOut<Handshake, Message>(
  { path: "/logs", expose: true },
  async (handshake, stream) => {
    try {
      for await (const row of mockedLogs(handshake.rows, stream)) {
        // Send the message to the client
        await stream.send({ row });
      }
    } catch (err) {
      log.error("Upload error:", err);
    }
  },
);

// This function generates an async iterator that yields mocked log rows
async function* mockedLogs(rows: number, stream: StreamOut<Message>) {
  for (let i = 0; i < rows; i++) {
    yield new Promise<string>((resolve) => {
      setTimeout(() => {
        resolve(`Log row ${i + 1}`);
      }, 500);
    });
  }

  // Close the stream when all logs have been sent
  await stream.close();
}
```

For `api.streamOut` you need to specify the outgoing message type, the handshake type is optional.

```ts
api.streamOut<Handshake, Outgoing>(
  {...}, async (handshake, stream) => {...})
```

```ts
api.streamOut<Outgoing>(
  {...}, async (stream) => {...})
```

### StreamInOut

Use `api.streamInOut` when you want to stream messages in both directions, for example if you are building a chat application:

```typescript
import { api } from "encore.dev/api";

interface InMessage {
  // ...
}

interface OutMessage {
  // ...
}

export const ChatStream = api.streamInOut<InMessage, OutMessage>(
  { path: "/chat", expose: true },
  async (stream) => {
    for await (const chatMessage of stream) {
      // Respond to the message by sending something back
      await stream.send({ /* ... */ })
    }
  }
);
```

For `api.streamInOut` you need to specify both the incoming and outgoing message types, the handshake type is optional.

```ts
api.streamInOut<Handshake, Incoming, Outgoing>(
  {...}, async (handshake, stream) => {...})
```

```ts
api.streamInOut<Incoming, Outgoing>(
  {...}, async (stream) => {...})
```


## Handshake

When you connect to a streaming API endpoint, the client and server will do a handshake in the form of a HTTP request. For all stream types the handshake type is optional, and only needs to be used whenever you need data from the initial request, such as path parameters, query parameters or headers.

Note that if you add a handshake data type you also get two arguments to your handler, one for the handshake data and one for the stream, and if you omit the handshake type you only get the stream.

## Requiring authentication

You can use your `authHandler` in the same way as for regular endpoints, just specify `auth: true` in your endpoint options. The auth data will be passed from the client to the server via query parameters or headers in the initial handshake request.

After a request has been successfully authenticated, you can access authentication data passed from the `authHandler` by calling `getAuthData()`. See more details in the [auth handler docs](/docs/ts/develop/auth#authentication-handlers).

## Broadcasting messages

To broadcast messages to all connected clients, you can store the streams in a map and iterate over them when a new message is received. If a client disconnects, you can remove the stream from the map.

```ts
import { api, StreamInOut } from "encore.dev/api";

// Map to hold all connected streams
const connectedStreams: Map<
  string,
  StreamInOut<ChatMessage, ChatMessage>
> = new Map();

// Object sent from the client to the server when establishing a connection
interface HandshakeRequest {
  id: string;
}

// Object by both server and client
interface ChatMessage {
  username: string;
  msg: string;
}

export const chat = api.streamInOut<HandshakeRequest, ChatMessage, ChatMessage>(
  { expose: true, auth: false, path: "/chat" },
  async (handshake, stream) => {
    connectedStreams.set(handshake.id, stream);

    try {
      // The stream object is an AsyncIterator that yields incoming messages.
      // The loop will continue as long as the client keeps the connection open.
      for await (const chatMessage of stream) {
        for (const [key, val] of connectedStreams) {
          try {
            // Send the users message to all connected clients.
            await val.send(chatMessage);
          } catch (err) {
            // If there is an error sending the message, remove the client from the map.
            connectedStreams.delete(key);
          }
        }
      }
    } catch (err) {
      // If there is an error reading from the stream, remove the client from the map.
      connectedStreams.delete(handshake.id);
    }

    // When the client disconnects, remove them from the map.
    connectedStreams.delete(handshake.id);
  },
);
```

## Connecting with the client

Using the [generated client](/docs/ts/cli/client-generation), you can connect to a streaming API endpoint that have `expose` set to `true`. The client stream acts as an async iterator, allowing you to retrieve messages by simply iterating over it:

```typescript
const stream = client.serviceName.endpointName();
for await (const msg of stream) {
  // Do something with each message
}

```

To send messages to the service, use the async `send` method:

```typescript
const stream = client.serviceName.endpointName();
await stream.send({ ... });
```

To handle network errors or do some cleanup after the connection is closed, you can attach event listeners on the underlying socket:

```typescript
const stream = client.serviceName.endpointName();

stream.socket.on("error", (event) => {
  // An error occurred
});

stream.socket.on("close", (event) => {
  // Connection was closed
});

```

## Service to service streaming

Like with [other endpoint types](/docs/ts/primitives/api-calls) you can easily use streaming between services by importing `~encore/clients`.
If you want the stream to only be reachable by other services (and not from the public internet), set the `expose` option to false.

Example of using a stream endpoint from a regular api endpoint:

```typescript
import { chat } from "~encore/clients"; // import 'chat' service

export const myOtherAPI = api({}, async (): Promise<void> => {
  const stream = await chat.myStreamingEndpoint();

  // send a message to the chat service over the stream
  await stream.send({ msg: "data" });

  for await (const msg of stream) {
    // handle incoming message
  }
});
```

